-- File to create functions and helpers needed for subsequent tests

-- create a helper function to create objects on each node
CREATE OR REPLACE FUNCTION run_command_on_master_and_workers(p_sql text)
RETURNS void LANGUAGE plpgsql AS $$
BEGIN
     EXECUTE p_sql;
     PERFORM run_command_on_workers(p_sql);
END;$$;

-- Create a function to make sure that queries returning the same result
CREATE OR REPLACE FUNCTION raise_failed_execution(query text) RETURNS void AS $$
BEGIN
	EXECUTE query;
	EXCEPTION WHEN OTHERS THEN
	IF SQLERRM LIKE 'failed to execute task%' THEN
		RAISE 'Task failed to execute';
	END IF;
END;
$$LANGUAGE plpgsql;

-- Create a function to ignore worker plans in explain output
CREATE OR REPLACE FUNCTION coordinator_plan(explain_command text, out query_plan text)
RETURNS SETOF TEXT AS $$
BEGIN
  FOR query_plan IN execute explain_command LOOP
    RETURN next;
    IF query_plan LIKE '%Task Count:%'
    THEN
        RETURN;
    END IF;
  END LOOP;
  RETURN;
END; $$ language plpgsql;

-- Create a function to ignore worker plans in explain output
-- It also shows task count for plan and subplans
CREATE OR REPLACE FUNCTION coordinator_plan_with_subplans(explain_command text, out query_plan text)
RETURNS SETOF TEXT AS $$
DECLARE
    task_count_line_reached boolean := false;
BEGIN
  FOR query_plan IN execute explain_command LOOP
    IF NOT task_count_line_reached THEN
        RETURN next;
    END IF;
    IF query_plan LIKE '%Task Count:%' THEN
        IF NOT task_count_line_reached THEN
            SELECT true INTO task_count_line_reached;
        ELSE
            RETURN next;
        END IF;
    END IF;
  END LOOP;
  RETURN;
END; $$ language plpgsql;

-- Create a function to normalize Memory Usage, Buckets, Batches
CREATE OR REPLACE FUNCTION plan_normalize_memory(explain_command text, out query_plan text)
RETURNS SETOF TEXT AS $$
BEGIN
  FOR query_plan IN execute explain_command LOOP
    query_plan := regexp_replace(query_plan, '(Memory( Usage)?|Buckets|Batches): \S*',  '\1: xxx', 'g');
    RETURN NEXT;
  END LOOP;
END; $$ language plpgsql;

-- helper function that returns true if output of given explain has "is not null" (case in-sensitive)
CREATE OR REPLACE FUNCTION explain_has_is_not_null(explain_command text)
RETURNS BOOLEAN AS $$
DECLARE
  query_plan text;
BEGIN
  FOR query_plan IN EXECUTE explain_command LOOP
    IF query_plan ILIKE '%is not null%'
    THEN
        RETURN true;
    END IF;
  END LOOP;
  RETURN false;
END; $$ language plpgsql;

-- helper function that returns true if output of given explain has "is not null" (case in-sensitive)
CREATE OR REPLACE FUNCTION explain_has_distributed_subplan(explain_command text)
RETURNS BOOLEAN AS $$
DECLARE
  query_plan text;
BEGIN
  FOR query_plan IN EXECUTE explain_command LOOP
    IF query_plan ILIKE '%Distributed Subplan %_%'
    THEN
        RETURN true;
    END IF;
  END LOOP;
  RETURN false;
END; $$ language plpgsql;

--helper function to check there is a single task
CREATE OR REPLACE FUNCTION explain_has_single_task(explain_command text)
RETURNS BOOLEAN AS $$
DECLARE
  query_plan text;
BEGIN
  FOR query_plan IN EXECUTE explain_command LOOP
    IF query_plan ILIKE '%Task Count: 1%'
    THEN
        RETURN true;
    END IF;
  END LOOP;
  RETURN false;
END; $$ language plpgsql;

-- helper function to quickly run SQL on the whole cluster
CREATE OR REPLACE FUNCTION run_command_on_coordinator_and_workers(p_sql text)
RETURNS void LANGUAGE plpgsql AS $$
BEGIN
     EXECUTE p_sql;
     PERFORM run_command_on_workers(p_sql);
END;$$;

-- 1. Marks the given procedure as colocated with the given table.
-- 2. Marks the argument index with which we route the procedure.
CREATE OR REPLACE FUNCTION colocate_proc_with_table(procname text, tablerelid regclass, argument_index int)
RETURNS void LANGUAGE plpgsql AS $$
BEGIN
    update pg_catalog.pg_dist_object
    set distribution_argument_index = argument_index, colocationid = pg_dist_partition.colocationid
    from pg_proc, pg_dist_partition
    where proname = procname and oid = objid and pg_dist_partition.logicalrelid = tablerelid;
END;$$;

-- helper function to verify the function of a coordinator is the same on all workers
CREATE OR REPLACE FUNCTION verify_function_is_same_on_workers(funcname text)
    RETURNS bool
    LANGUAGE plpgsql
AS $func$
DECLARE
    coordinatorSql text;
    workerSql text;
BEGIN
    SELECT pg_get_functiondef(funcname::regprocedure) INTO coordinatorSql;
    FOR workerSql IN SELECT result FROM run_command_on_workers('SELECT pg_get_functiondef(' || quote_literal(funcname) || '::regprocedure)') LOOP
            IF workerSql != coordinatorSql THEN
                RAISE INFO 'functions are different, coordinator:% worker:%', coordinatorSql, workerSql;
                RETURN false;
            END IF;
        END LOOP;

    RETURN true;
END;
$func$;

--
-- Procedure for creating shards for range partitioned distributed table.
--
CREATE OR REPLACE PROCEDURE create_range_partitioned_shards(rel regclass, minvalues text[], maxvalues text[])
AS $$
DECLARE
  new_shardid bigint;
  idx int;
BEGIN
  FOR idx IN SELECT * FROM generate_series(1, array_length(minvalues, 1))
  LOOP
    SELECT master_create_empty_shard(rel::text) INTO new_shardid;
    UPDATE pg_dist_shard SET shardminvalue=minvalues[idx], shardmaxvalue=maxvalues[idx] WHERE shardid=new_shardid;
  END LOOP;
END;
$$ LANGUAGE plpgsql;

-- Introduce a function that waits until all cleanup records are deleted, for testing purposes
CREATE OR REPLACE FUNCTION wait_for_resource_cleanup() RETURNS void
SET client_min_messages TO ERROR
AS $$
DECLARE
record_count integer;
BEGIN
    EXECUTE 'SELECT COUNT(*) FROM pg_catalog.pg_dist_cleanup' INTO record_count;
    WHILE  record_count != 0 LOOP
      CALL pg_catalog.citus_cleanup_orphaned_resources();
      EXECUTE 'SELECT COUNT(*) FROM pg_catalog.pg_dist_cleanup' INTO record_count;
    END LOOP;
END$$ LANGUAGE plpgsql;

-- Returns the foreign keys where the referencing relation's name starts with
-- given prefix.
--
-- Foreign keys are groupped by their configurations and then the constraint name,
-- referencing table, and referenced table for each distinct configuration are
-- aggregated into arrays.
CREATE OR REPLACE FUNCTION get_grouped_fkey_constraints(referencing_relname_prefix text)
RETURNS jsonb AS $func$
  DECLARE
    confdelsetcols_column_ref text;
    get_grouped_fkey_constraints_query text;
    result jsonb;
  BEGIN
    -- Read confdelsetcols as null if no such column exists.
    -- This can only be the case for PG versions < 15.
    IF EXISTS (SELECT 1 FROM pg_attribute WHERE attrelid = 'pg_constraint'::regclass AND attname='confdelsetcols')
    THEN
      confdelsetcols_column_ref := '(SELECT array_agg(attname ORDER BY attnum) FROM pg_attribute WHERE attrelid = conrelid AND attnum = ANY(confdelsetcols))';
    ELSE
      confdelsetcols_column_ref := '(SELECT null::smallint[])';
    END IF;

    EXECUTE format(
      $$
      SELECT jsonb_agg(to_jsonb(q1.*) ORDER BY q1.constraint_names) AS fkeys_with_different_config FROM (
        SELECT array_agg(constraint_name ORDER BY constraint_oid) AS constraint_names,
               array_agg(referencing_table::regclass::text ORDER BY constraint_oid) AS referencing_tables,
               array_agg(referenced_table::regclass::text ORDER BY constraint_oid) AS referenced_tables,
               referencing_columns, referenced_columns, deferable, deferred, on_update, on_delete, match_type, referencing_columns_set_null_or_default
        FROM (
          SELECT
            oid AS constraint_oid,
            conname AS constraint_name,
            conrelid AS referencing_table,
            (SELECT array_agg(attname ORDER BY attnum) FROM pg_attribute WHERE attrelid = conrelid AND attnum = ANY(conkey)) AS referencing_columns,
            confrelid AS referenced_table,
            (SELECT array_agg(attname ORDER BY attnum) FROM pg_attribute WHERE attrelid = confrelid AND attnum = ANY(confkey)) AS referenced_columns,
            condeferrable AS deferable,
            condeferred AS deferred,
            confupdtype AS on_update,
            confdeltype AS on_delete,
            confmatchtype AS match_type,
            %2$s AS referencing_columns_set_null_or_default
          FROM pg_constraint WHERE starts_with(conrelid::regclass::text, '%1$s') AND contype = 'f'
        ) q2
        GROUP BY referencing_columns, referenced_columns, deferable, deferred, on_update, on_delete, match_type, referencing_columns_set_null_or_default
      ) q1
      $$,
      referencing_relname_prefix,
      confdelsetcols_column_ref
    ) INTO result;
    RETURN result;
  END;
$func$ LANGUAGE plpgsql;

CREATE OR REPLACE FUNCTION get_index_defs(schemaname text, tablename text)
RETURNS jsonb AS $func$
  DECLARE
    result jsonb;
    indnullsnotdistinct_column_ref text;
  BEGIN
    -- Not use indnullsnotdistinct in group by clause if no such column exists.
    -- This can only be the case for PG versions < 15.
    IF EXISTS (SELECT 1 FROM pg_attribute WHERE attrelid = 'pg_index'::regclass AND attname='indnullsnotdistinct')
    THEN
      indnullsnotdistinct_column_ref := ',indnullsnotdistinct';
    ELSE
      indnullsnotdistinct_column_ref := '';
    END IF;

    EXECUTE format(
      $$
      SELECT jsonb_agg(to_jsonb(q1.*) ORDER BY q1.indexnames) AS index_defs FROM (
        SELECT array_agg(indexname ORDER BY indexrelid) AS indexnames,
               array_agg(indexdef ORDER BY indexrelid) AS indexdefs
        FROM pg_indexes
        JOIN pg_index
        ON (indexrelid = (schemaname || '.' || indexname)::regclass)
        WHERE schemaname = '%1$s' AND starts_with(tablename, '%2$s')
        GROUP BY indnatts, indnkeyatts, indisunique, indisprimary, indisexclusion,
                 indimmediate, indisclustered, indisvalid, indisready, indislive,
                 indisreplident, indkey, indcollation, indclass, indoption, indexprs,
                 indpred %3$s
      ) q1
      $$,
      schemaname, tablename, indnullsnotdistinct_column_ref) INTO result;
    RETURN result;
  END;
$func$ LANGUAGE plpgsql;

CREATE OR REPLACE FUNCTION get_column_defaults(schemaname text, tablename text)
RETURNS jsonb AS $func$
  DECLARE
    result jsonb;
  BEGIN
    EXECUTE format(
      $$
      SELECT jsonb_agg(to_jsonb(q1.*) ORDER BY q1.column_name) AS column_defs FROM (
        SELECT column_name, column_default::text, generation_expression::text
        FROM information_schema.columns
        WHERE table_schema = '%1$s' AND table_name = '%2$s' AND
              column_default IS NOT NULL OR generation_expression IS NOT NULL
      ) q1
      $$,
      schemaname, tablename) INTO result;
    RETURN result;
  END;
$func$ LANGUAGE plpgsql;

CREATE OR REPLACE FUNCTION get_column_attrs(relname_prefix text)
RETURNS jsonb AS $func$
  DECLARE
    result jsonb;
  BEGIN
    EXECUTE format(
      $$
      SELECT to_jsonb(q2.*) FROM (
        SELECT relnames, jsonb_agg(to_jsonb(q1.*) - 'relnames' ORDER BY q1.column_name) AS column_attrs FROM (
          SELECT array_agg(attrelid::regclass::text ORDER BY attrelid) AS relnames,
                 attname AS column_name, typname AS type_name, collname AS collation_name, attcompression AS compression_method, attnotnull AS not_null
          FROM pg_attribute pa
          LEFT JOIN pg_type pt ON (pa.atttypid = pt.oid)
          LEFT JOIN pg_collation pc1 ON (pa.attcollation = pc1.oid)
          JOIN pg_class pc2 ON (pa.attrelid = pc2.oid)
          WHERE starts_with(attrelid::regclass::text, '%1$s') AND
                attnum > 0 AND NOT attisdropped AND relkind = 'r'
          GROUP BY column_name, type_name, collation_name, compression_method, not_null
        ) q1
        GROUP BY relnames
      ) q2
      $$,
      relname_prefix) INTO result;
    RETURN result;
  END;
$func$ LANGUAGE plpgsql;

-- Returns true if all shard placements of given table have given number of indexes.
CREATE OR REPLACE FUNCTION verify_index_count_on_shard_placements(
    qualified_table_name text,
    n_expected_indexes int)
RETURNS BOOLEAN
AS $func$
DECLARE
    v_result boolean;
BEGIN
    SELECT n_expected_indexes = ALL(
        SELECT result::int INTO v_result
        FROM run_command_on_placements(
            qualified_table_name,
            $$SELECT COUNT(*) FROM pg_index WHERE indrelid::regclass = '%s'::regclass$$
        )
    );
    RETURN v_result;
END;
$func$ LANGUAGE plpgsql;

-- Returns names of the foreign keys that shards of given table are involved in
-- (as referencing or referenced one).
CREATE OR REPLACE FUNCTION get_fkey_names_on_placements(
    qualified_table_name text)
RETURNS TABLE (
    on_node text,
    shard_id bigint,
    fkey_names text[]
)
AS $func$
BEGIN
    RETURN QUERY SELECT
        CASE WHEN groupid = 0 THEN 'on_coordinator' ELSE 'on_worker' END AS on_node_col,
        shardid,
        (CASE WHEN result = '' THEN '{}' ELSE result END)::text[] AS fkey_names_col
    FROM run_command_on_placements(
        qualified_table_name,
        $$SELECT array_agg(conname ORDER BY conname) FROM pg_constraint WHERE '%s'::regclass IN (conrelid, confrelid) AND contype = 'f'$$
    )
    JOIN pg_dist_node USING (nodename, nodeport);
END;
$func$ LANGUAGE plpgsql;

-- Returns true if all shard placements of given table have given number of partitions.
CREATE OR REPLACE FUNCTION verify_partition_count_on_placements(
    qualified_table_name text,
    n_expected_partitions int)
RETURNS BOOLEAN
AS $func$
DECLARE
    v_result boolean;
BEGIN
    SELECT n_expected_partitions = ALL(
        SELECT result::int INTO v_result
        FROM run_command_on_placements(
            qualified_table_name,
            $$SELECT COUNT(*) FROM pg_inherits WHERE inhparent = '%s'::regclass;$$
        )
    );
    RETURN v_result;
END;
$func$ LANGUAGE plpgsql;

-- This function checks pg_dist_placement on all nodes and returns true if the following holds:
--   Whether shard is on the coordinator or on a primary worker node, and if this is expected.
--   Given shardid is used for shard placement of the table.
--   Placement metadata is correct on all nodes.
CREATE OR REPLACE FUNCTION verify_shard_placement_for_single_shard_table(
    qualified_table_name text,
    expected_shard_id bigint,
    expect_placement_on_coord boolean)
RETURNS BOOLEAN
AS $func$
DECLARE
    verify_workers_query text;
    nodename_nodeport_groupid record;
    result boolean;
BEGIN
    SELECT nodename, nodeport, groupid INTO nodename_nodeport_groupid
    FROM pg_dist_shard
    JOIN pg_dist_placement USING (shardid)
    JOIN pg_dist_node USING (groupid)
    WHERE noderole = 'primary' AND shouldhaveshards AND isactive AND
          logicalrelid = qualified_table_name::regclass AND shardid = expected_shard_id;

    IF nodename_nodeport_groupid IS NULL
    THEN
        RAISE NOTICE 'Shard placement is not on a primary worker node';
        RETURN false;
    END IF;

    IF (nodename_nodeport_groupid.groupid = 0) != expect_placement_on_coord
    THEN
        RAISE NOTICE 'Shard placement is on an unexpected node';
        RETURN false;
    END IF;

    -- verify that metadata on workers is correct too
    SELECT format(
        'SELECT true = ALL(
            SELECT result::boolean FROM run_command_on_workers($$
                SELECT COUNT(*) = 1
                FROM pg_dist_shard
                JOIN pg_dist_placement USING (shardid)
                JOIN pg_dist_node USING (groupid)
                WHERE logicalrelid = ''%s''::regclass AND
                      shardid = %s AND
                      nodename = ''%s'' AND
                      nodeport = %s AND
                      groupid = %s
            $$)
        );',
        qualified_table_name, expected_shard_id,
        nodename_nodeport_groupid.nodename,
        nodename_nodeport_groupid.nodeport,
        nodename_nodeport_groupid.groupid
    )
    INTO verify_workers_query;

    EXECUTE verify_workers_query INTO result;
    RETURN result;
END;
$func$ LANGUAGE plpgsql;

-- This function checks pg_dist_placement on all nodes and returns true if the following holds:
--   Shard placement exist on coordinator and on all primary worker nodes.
--   Given shardid is used for shard placements of the table.
--   Given placementid is used for the coordinator shard placement.
--   Placement metadata is correct on all nodes.
CREATE OR REPLACE FUNCTION verify_shard_placements_for_reference_table(
    qualified_table_name text,
    expected_shard_id bigint,
    expected_coord_placement_id bigint)
RETURNS BOOLEAN
AS $func$
DECLARE
    verify_workers_query text;
    result boolean;
BEGIN
    SELECT format(
        'SELECT true = ALL(
            SELECT result::boolean FROM run_command_on_all_nodes($$
                SELECT
                        (SELECT COUNT(*) FROM pg_dist_node WHERE noderole = ''primary'' AND isactive) =
                        (SELECT COUNT(*)
                         FROM pg_dist_shard
                         JOIN pg_dist_placement USING (shardid)
                         JOIN pg_dist_node USING (groupid)
                         WHERE noderole = ''primary'' AND isactive AND
                               logicalrelid = ''%s''::regclass AND shardid = %s)
                    AND
                        (SELECT COUNT(*) = 1
                         FROM pg_dist_shard
                         JOIN pg_dist_placement USING (shardid)
                         JOIN pg_dist_node USING (groupid)
                         WHERE noderole = ''primary'' AND isactive AND
                               logicalrelid = ''%s''::regclass AND shardid = %s AND
                               placementid = %s AND groupid = 0)

            $$)
        );',
        qualified_table_name, expected_shard_id,
        qualified_table_name, expected_shard_id,
        expected_coord_placement_id
    )
    INTO verify_workers_query;

    EXECUTE verify_workers_query INTO result;
    RETURN result;
END;
$func$ LANGUAGE plpgsql;

-- This function checks pg_dist_partition on all nodes and returns true if the metadata
-- record for given single-shard table is correct.
CREATE OR REPLACE FUNCTION verify_pg_dist_partition_for_single_shard_table(
    qualified_table_name text)
RETURNS BOOLEAN
AS $func$
DECLARE
    verify_workers_query text;
    result boolean;
BEGIN
    SELECT format(
        'SELECT true = ALL(
            SELECT result::boolean FROM run_command_on_all_nodes($$
                SELECT COUNT(*) = 1
                FROM pg_dist_partition
                WHERE logicalrelid = ''%s''::regclass AND
                      partmethod = ''n'' AND
                      partkey IS NULL AND
                      colocationid > 0 AND
                      repmodel = ''s'' AND
                      autoconverted = false
            $$)
        );',
    qualified_table_name)
    INTO verify_workers_query;

    EXECUTE verify_workers_query INTO result;
    RETURN result;
END;
$func$ LANGUAGE plpgsql;

-- This function checks pg_dist_partition on all nodes and returns true if the metadata
-- record for given reference table is correct.
CREATE OR REPLACE FUNCTION verify_pg_dist_partition_for_reference_table(
    qualified_table_name text)
RETURNS BOOLEAN
AS $func$
DECLARE
    verify_workers_query text;
    result boolean;
BEGIN
    SELECT format(
        'SELECT true = ALL(
            SELECT result::boolean FROM run_command_on_all_nodes($$
                SELECT COUNT(*) = 1
                FROM pg_dist_partition
                WHERE logicalrelid = ''%s''::regclass AND
                      partmethod = ''n'' AND
                      partkey IS NULL AND
                      colocationid > 0 AND
                      repmodel = ''t'' AND
                      autoconverted = false
            $$)
        );',
    qualified_table_name)
    INTO verify_workers_query;

    EXECUTE verify_workers_query INTO result;
    RETURN result;
END;
$func$ LANGUAGE plpgsql;

CREATE or REPLACE FUNCTION initplan_references_to_pg17(text) returns text AS $$
DECLARE
  expr_parts text[];
  initplan_refs text[];
  n_initplan_refs int = 0;
  i int := 1;
  rv text := '';
  expr_part text;
BEGIN
  -- Split the line on each $x; there must be at least one
  -- For example 'foo = $0 and bar < $1' is split to: [ 'foo =', 'bar <' ]
  expr_parts := regexp_split_to_array($1, '\$\d+');

  -- Construct the PG17 formatted names in the given text
  -- for example 'foo = $0 and bar < $1' yields [ '(InitPlan1).col1', '(InitPlan2).col1' ]
  initplan_refs := ARRAY(select '(InitPlan ' || substr(x[1],2)::int + 1  || ').col1' from regexp_matches($1, '\$\d', 'g')  x);
  n_initplan_refs := array_length(initplan_refs, 1);

  -- Combine expression parts with PG17 formatted names
  FOREACH expr_part IN ARRAY expr_parts
  LOOP
    rv := rv || expr_part;
    -- There should be more expr parts than init plan refs so
    -- check init plan refs boundary each time
    IF i <= n_initplan_refs THEN
      rv := rv || initplan_refs[i];
    END IF;
    i := i + 1;
  END LOOP;
  RETURN rv;
END;
$$ LANGUAGE plpgsql;

-- This function formats EXPLAIN output to conform to how PG17 EXPLAIN shows
-- scalar subquery outputs if the pg version is less than 17 (*). When 17
-- becomes the minimum supported pgversion this function can be retired.
--
-- (*) https://git.postgresql.org/gitweb/?p=postgresql.git;a=commitdiff;h=fd0398fcb
CREATE OR REPLACE FUNCTION explain_with_pg17_initplan_format(explain_command text, out query_plan text)
RETURNS SETOF TEXT AS $$
DECLARE
  pgversion int = 0;
BEGIN
  pgversion = substring(version(), '\d+')::int ;
  FOR query_plan IN execute explain_command LOOP
    IF pgversion < 17 THEN
      -- Two types of format changes are needed:
      -- 1) 'Init Plan 1 (returns $0)' becomes just 'Init Plan 1'
      -- 2) 'foo = $0' becomes 'foo = (InitPlan 1).col1'
      IF query_plan ~ 'InitPlan \d \(returns' THEN
    	  query_plan = regexp_replace(query_plan, '\(returns \$\d\)', '', 'g');
      ELSIF query_plan ~ '\$\d' THEN
        -- This line contains at least one InitPlan reference
        -- Replace it to have PG17 style InitPlan references
	      query_plan = public.initplan_references_to_pg17(query_plan);
      END IF;
    END IF;
    RETURN NEXT;
  END LOOP;
END; $$ language plpgsql;

-- This function formats EXPLAIN output to conform to how pg <= 16 EXPLAIN
-- shows ANY <subquery> in an expression the pg version >= 17. When 17 is
-- the minimum supported pgversion this function can be retired. The commit
-- that changed how ANY <subquery> exrpressions appear in EXPLAIN is:
-- https://git.postgresql.org/gitweb/?p=postgresql.git;a=commitdiff;h=fd0398fcb
CREATE OR REPLACE FUNCTION explain_with_pg16_subplan_format(explain_command text, out query_plan text)
RETURNS SETOF TEXT AS $$
DECLARE
  pgversion int = 0;
BEGIN
  pgversion = substring(version(), '\d+')::int ;
  FOR query_plan IN execute explain_command LOOP
    IF pgversion >= 17 THEN
      IF query_plan ~ 'SubPlan \d+\).col' THEN
    	  query_plan = regexp_replace(query_plan, '\(ANY \(\w+ = \(SubPlan (\d+)\).col1\)\)', '(SubPlan \1)', 'g');
      END IF;
    END IF;
    RETURN NEXT;
  END LOOP;
END; $$ language plpgsql;

-- To produce stable regression test output, it's usually necessary to
-- ignore details such as exact costs or row counts.  These filter
-- functions replace changeable output details with fixed strings.
-- Copied from PG explain.sql

create function explain_filter(text) returns setof text
language plpgsql as
$$
declare
    ln text;
begin
    for ln in execute $1
    loop
        -- Replace any numeric word with just 'N'
        ln := regexp_replace(ln, '-?\m\d+\M', 'N', 'g');
        -- In sort output, the above won't match units-suffixed numbers
        ln := regexp_replace(ln, '\m\d+kB', 'NkB', 'g');
        -- Ignore text-mode buffers output because it varies depending
        -- on the system state
        CONTINUE WHEN (ln ~ ' +Buffers: .*');
        -- Ignore text-mode "Planning:" line because whether it's output
        -- varies depending on the system state
        CONTINUE WHEN (ln = 'Planning:');
        return next ln;
    end loop;
end;
$$;
-- Returns pg_seclabels entries from all nodes in the cluster for which
-- the object name is the input.
CREATE OR REPLACE FUNCTION get_citus_tests_label_provider_labels(object_name text,
                                                                 master_port INTEGER DEFAULT 57636,
                                                                 worker_1_port INTEGER DEFAULT 57637,
                                                                 worker_2_port INTEGER DEFAULT 57638)
RETURNS TABLE (
    node_type text,
    result text
)
AS $func$
DECLARE
    pg_seclabels_cmd TEXT := 'SELECT to_jsonb(q.*) FROM (' ||
                             'SELECT provider, objtype, label FROM pg_seclabels ' ||
                             'WHERE objname = ''' || object_name || ''') q';
BEGIN
    RETURN QUERY
    SELECT
        CASE
            WHEN nodeport = master_port THEN 'coordinator'
            WHEN nodeport = worker_1_port THEN 'worker_1'
            WHEN nodeport = worker_2_port THEN 'worker_2'
            ELSE 'unexpected_node'
        END AS node_type,
        a.result
    FROM run_command_on_all_nodes(pg_seclabels_cmd) a
    JOIN pg_dist_node USING (nodeid)
    ORDER BY node_type;
END;
$func$ LANGUAGE plpgsql;
